import unittest

import merge_active_shadow

from tools.api_proto_plugin import type_context as api_type_context
from tools.protoxform import utils

from google.protobuf import descriptor_pb2
from google.protobuf import text_format


class MergeActiveShadowTest(unittest.TestCase):
    # Dummy type context for tests that don't care about this.
    def fake_type_context(self):
        fake_source_code_info = descriptor_pb2.SourceCodeInfo()
        source_code_info = api_type_context.SourceCodeInfo('fake', fake_source_code_info)
        return api_type_context.TypeContext(source_code_info, 'fake_package')

    # Poor man's text proto equivalence. Tensorflow has better tools for this,
    # i.e. assertProto2Equal.
    def assert_text_proto_eq(self, lhs, rhs):
        self.assertMultiLineEqual(lhs.strip(), rhs.strip())

    def testadjust_reserved_range(self):
        """adjust_reserved_range removes specified skip_reserved_numbers."""
        desc_pb_text = """
reserved_range {
  start: 41
  end: 41
}
reserved_range {
  start: 42
  end: 42
}
reserved_range {
  start: 43
  end: 44
}
reserved_range {
  start: 50
  end: 51
}
    """
        desc = descriptor_pb2.DescriptorProto()
        text_format.Merge(desc_pb_text, desc)
        target = descriptor_pb2.DescriptorProto()
        merge_active_shadow.adjust_reserved_range(target, desc.reserved_range, [42, 43])
        target_pb_text = """
reserved_range {
  start: 41
  end: 41
}
reserved_range {
  start: 50
  end: 51
}
    """
        self.assert_text_proto_eq(target_pb_text, str(target))

    def testmerge_active_shadow_enum(self):
        """merge_active_shadow_enum recovers shadow values."""
        active_pb_text = """
value {
  number: 1
  name: "foo"
}
value {
  number: 0
  name: "DEPRECATED_AND_UNAVAILABLE_DO_NOT_USE"
}
value {
  number: 3
  name: "bar"
}
reserved_name: "baz"
reserved_range {
  start: 2
  end: 3
}
    """
        active_proto = descriptor_pb2.EnumDescriptorProto()
        text_format.Merge(active_pb_text, active_proto)
        shadow_pb_text = """
value {
  number: 1
  name: "foo"
}
value {
  number: 0
  name: "wow"
}
value {
  number: 3
  name: "bar"
}
value {
  number: 2
  name: "hidden_envoy_deprecated_baz"
}
value {
  number: 4
  name: "hidden_envoy_deprecated_huh"
}
    """
        shadow_proto = descriptor_pb2.EnumDescriptorProto()
        text_format.Merge(shadow_pb_text, shadow_proto)
        target_proto = descriptor_pb2.EnumDescriptorProto()
        target_proto_dependencies = []
        merge_active_shadow.merge_active_shadow_enum(
            active_proto, shadow_proto, target_proto, target_proto_dependencies)
        target_pb_text = """
value {
  name: "foo"
  number: 1
}
value {
  name: "wow"
  number: 0
}
value {
  name: "bar"
  number: 3
}
value {
  name: "hidden_envoy_deprecated_baz"
  number: 2
}
    """
        self.assert_text_proto_eq(target_pb_text, str(target_proto))

    def testmerge_active_shadow_message_comments(self):
        """merge_active_shadow_message preserves comment field correspondence."""
        active_pb_text = """
field {
  number: 9
  name: "oneof_1_0"
  oneof_index: 0
}
field {
  number: 1
  name: "simple_field_0"
}
field {
  number: 0
  name: "oneof_2_0"
  oneof_index: 2
}
field {
  number: 8
  name: "oneof_2_1"
  oneof_index: 2
}
field {
  number: 3
  name: "oneof_0_0"
  oneof_index: 1
}
field {
  number: 4
  name: "newbie"
}
field {
  number: 7
  name: "oneof_3_0"
  oneof_index: 3
}
reserved_name: "missing_oneof_field_0"
reserved_name: "missing_oneof_field_1"
reserved_name: "missing_oneof_field_2"
oneof_decl {
  name: "oneof_0"
}
oneof_decl {
  name: "oneof_1"
}
oneof_decl {
  name: "oneof_2"
}
oneof_decl {
  name: "oneof_3"
}
    """
        active_proto = descriptor_pb2.DescriptorProto()
        text_format.Merge(active_pb_text, active_proto)
        active_source_code_info_text = """
location {
  path: [4, 1, 2, 4]
  leading_comments: "field_4"
}
location {
  path: [4, 1, 2, 5]
  leading_comments: "field_5"
}
location {
  path: [4, 1, 2, 3]
  leading_comments: "field_3"
}
location {
  path: [4, 1, 2, 0]
  leading_comments: "field_0"
}
location {
  path: [4, 1, 2, 1]
  leading_comments: "field_1"
}
location {
  path: [4, 0, 2, 2]
  leading_comments: "ignore_0"
}
location {
  path: [4, 1, 2, 6]
  leading_comments: "field_6"
}
location {
  path: [4, 1, 2, 2]
  leading_comments: "field_2"
}
location {
  path: [3]
  leading_comments: "ignore_1"
}
"""
        active_source_code_info = descriptor_pb2.SourceCodeInfo()
        text_format.Merge(active_source_code_info_text, active_source_code_info)
        shadow_pb_text = """
field {
  number: 10
  name: "hidden_envoy_deprecated_missing_oneof_field_0"
  oneof_index: 0
}
field {
  number: 11
  name: "hidden_envoy_deprecated_missing_oneof_field_1"
  oneof_index: 3
}
field {
  number: 11
  name: "hidden_envoy_deprecated_missing_oneof_field_2"
  oneof_index: 2
}
oneof_decl {
  name: "oneof_0"
}
oneof_decl {
  name: "oneof_1"
}
oneof_decl {
  name: "oneof_2"
}
oneof_decl {
  name: "some_removed_oneof"
}
oneof_decl {
  name: "oneof_3"
}
"""
        shadow_proto = descriptor_pb2.DescriptorProto()
        text_format.Merge(shadow_pb_text, shadow_proto)
        target_proto = descriptor_pb2.DescriptorProto()
        source_code_info = api_type_context.SourceCodeInfo('fake', active_source_code_info)
        fake_type_context = api_type_context.TypeContext(source_code_info, 'fake_package')
        target_proto_dependencies = []
        merge_active_shadow.merge_active_shadow_message(
            fake_type_context.extend_message(1, "foo", False), active_proto, shadow_proto,
            target_proto, target_proto_dependencies)
        target_pb_text = """
field {
  name: "oneof_1_0"
  number: 9
  oneof_index: 0
}
field {
  name: "hidden_envoy_deprecated_missing_oneof_field_0"
  number: 10
  oneof_index: 0
}
field {
  name: "simple_field_0"
  number: 1
}
field {
  name: "oneof_2_0"
  number: 0
  oneof_index: 2
}
field {
  name: "oneof_2_1"
  number: 8
  oneof_index: 2
}
field {
  name: "hidden_envoy_deprecated_missing_oneof_field_2"
  number: 11
  oneof_index: 2
}
field {
  name: "oneof_0_0"
  number: 3
  oneof_index: 1
}
field {
  name: "newbie"
  number: 4
}
field {
  name: "oneof_3_0"
  number: 7
  oneof_index: 3
}
field {
  name: "hidden_envoy_deprecated_missing_oneof_field_1"
  number: 11
  oneof_index: 4
}
oneof_decl {
  name: "oneof_0"
}
oneof_decl {
  name: "oneof_1"
}
oneof_decl {
  name: "oneof_2"
}
oneof_decl {
  name: "oneof_3"
}
oneof_decl {
  name: "some_removed_oneof"
}
    """
        target_source_code_info_text = """
location {
  path: 4
  path: 1
  path: 2
  path: 6
  leading_comments: "field_4"
}
location {
  path: 4
  path: 1
  path: 2
  path: 7
  leading_comments: "field_5"
}
location {
  path: 4
  path: 1
  path: 2
  path: 4
  leading_comments: "field_3"
}
location {
  path: 4
  path: 1
  path: 2
  path: 0
  leading_comments: "field_0"
}
location {
  path: 4
  path: 1
  path: 2
  path: 2
  leading_comments: "field_1"
}
location {
  path: 4
  path: 0
  path: 2
  path: 2
  leading_comments: "ignore_0"
}
location {
  path: 4
  path: 1
  path: 2
  path: 8
  leading_comments: "field_6"
}
location {
  path: 4
  path: 1
  path: 2
  path: 3
  leading_comments: "field_2"
}
location {
  path: 3
  leading_comments: "ignore_1"
}
"""
        self.maxDiff = None
        self.assert_text_proto_eq(target_pb_text, str(target_proto))
        self.assert_text_proto_eq(
            target_source_code_info_text, str(fake_type_context.source_code_info.proto))

    def testmerge_active_shadow_message(self):
        """merge_active_shadow_message recovers shadow fields with oneofs."""
        active_pb_text = """
field {
  number: 1
  name: "foo"
}
field {
  number: 0
  name: "bar"
  oneof_index: 2
}
field {
  number: 3
  name: "baz"
}
field {
  number: 4
  name: "newbie"
}
reserved_name: "wow"
reserved_range {
  start: 2
  end: 3
}
oneof_decl {
  name: "ign"
}
oneof_decl {
  name: "ign2"
}
oneof_decl {
  name: "some_oneof"
}
    """
        active_proto = descriptor_pb2.DescriptorProto()
        text_format.Merge(active_pb_text, active_proto)
        shadow_pb_text = """
field {
  number: 1
  name: "foo"
}
field {
  number: 0
  name: "bar"
}
field {
  number: 3
  name: "baz"
}
field {
  number: 5
  name: "hidden_envoy_deprecated_wow"
  options {
    deprecated: true
    [validate.rules] {
      string {
        max_bytes: 1024
      }
    }
    [envoy.annotations.disallowed_by_default]: true
  }
  oneof_index: 0
}
oneof_decl {
  name: "some_oneof"
}
    """
        shadow_proto = descriptor_pb2.DescriptorProto()
        text_format.Merge(shadow_pb_text, shadow_proto)
        target_proto = descriptor_pb2.DescriptorProto()
        target_proto_dependencies = []
        merge_active_shadow.merge_active_shadow_message(
            self.fake_type_context(), active_proto, shadow_proto, target_proto,
            target_proto_dependencies)
        target_pb_text = """
field {
  name: "foo"
  number: 1
}
field {
  name: "bar"
  number: 0
  oneof_index: 2
}
field {
  name: "hidden_envoy_deprecated_wow"
  number: 5
  options {
    deprecated: true
    [validate.rules] {
      string {
        max_bytes: 1024
      }
    }
    [envoy.annotations.disallowed_by_default]: true
  }
  oneof_index: 2
}
field {
  name: "baz"
  number: 3
}
field {
  name: "newbie"
  number: 4
}
oneof_decl {
  name: "ign"
}
oneof_decl {
  name: "ign2"
}
oneof_decl {
  name: "some_oneof"
}
reserved_range {
  start: 2
  end: 3
}
    """
        self.assert_text_proto_eq(target_pb_text, str(target_proto))
        self.assertEqual(target_proto_dependencies[0], 'envoy/annotations/deprecation.proto')

    def testmerge_active_shadow_message_no_shadow_message(self):
        """merge_active_shadow_message doesn't require a shadow message for new nested active messages."""
        active_proto = descriptor_pb2.DescriptorProto()
        shadow_proto = descriptor_pb2.DescriptorProto()
        active_proto.nested_type.add().name = 'foo'
        target_proto = descriptor_pb2.DescriptorProto()
        target_proto_dependencies = []
        merge_active_shadow.merge_active_shadow_message(
            self.fake_type_context(), active_proto, shadow_proto, target_proto,
            target_proto_dependencies)
        self.assertEqual(target_proto.nested_type[0].name, 'foo')

    def testmerge_active_shadow_message_no_shadow_enum(self):
        """merge_active_shadow_message doesn't require a shadow enum for new nested active enums."""
        active_proto = descriptor_pb2.DescriptorProto()
        shadow_proto = descriptor_pb2.DescriptorProto()
        active_proto.enum_type.add().name = 'foo'
        target_proto = descriptor_pb2.DescriptorProto()
        target_proto_dependencies = []
        merge_active_shadow.merge_active_shadow_message(
            self.fake_type_context(), active_proto, shadow_proto, target_proto,
            target_proto_dependencies)
        self.assertEqual(target_proto.enum_type[0].name, 'foo')

    def testmerge_active_shadow_message_missing(self):
        """merge_active_shadow_message recovers missing messages from shadow."""
        active_proto = descriptor_pb2.DescriptorProto()
        shadow_proto = descriptor_pb2.DescriptorProto()
        shadow_proto.nested_type.add().name = 'foo'
        target_proto = descriptor_pb2.DescriptorProto()
        target_proto_dependencies = []
        merge_active_shadow.merge_active_shadow_message(
            self.fake_type_context(), active_proto, shadow_proto, target_proto,
            target_proto_dependencies)
        self.assertEqual(target_proto.nested_type[0].name, 'foo')

    def testmerge_active_shadow_file_missing(self):
        """merge_active_shadow_file recovers missing messages from shadow."""
        active_proto = descriptor_pb2.FileDescriptorProto()
        shadow_proto = descriptor_pb2.FileDescriptorProto()
        shadow_proto.message_type.add().name = 'foo'
        target_proto = descriptor_pb2.DescriptorProto()
        target_proto = merge_active_shadow.merge_active_shadow_file(active_proto, shadow_proto)
        self.assertEqual(target_proto.message_type[0].name, 'foo')

    def testmerge_active_shadow_file_no_shadow_message(self):
        """merge_active_shadow_file doesn't require a shadow message for new active messages."""
        active_proto = descriptor_pb2.FileDescriptorProto()
        shadow_proto = descriptor_pb2.FileDescriptorProto()
        active_proto.message_type.add().name = 'foo'
        target_proto = descriptor_pb2.DescriptorProto()
        target_proto = merge_active_shadow.merge_active_shadow_file(active_proto, shadow_proto)
        self.assertEqual(target_proto.message_type[0].name, 'foo')

    def testmerge_active_shadow_file_no_shadow_enum(self):
        """merge_active_shadow_file doesn't require a shadow enum for new active enums."""
        active_proto = descriptor_pb2.FileDescriptorProto()
        shadow_proto = descriptor_pb2.FileDescriptorProto()
        active_proto.enum_type.add().name = 'foo'
        target_proto = descriptor_pb2.DescriptorProto()
        target_proto = merge_active_shadow.merge_active_shadow_file(active_proto, shadow_proto)
        self.assertEqual(target_proto.enum_type[0].name, 'foo')


# TODO(htuch): add some test for recursion.

if __name__ == '__main__':
    utils.load_protos(merge_active_shadow.PROTO_PACKAGES)
    unittest.main()
